Skip to content

Conversation

@zhzhan
Copy link
Contributor

@zhzhan zhzhan commented Oct 19, 2016

What changes were proposed in this pull request?

Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled.

BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors.

By default, the original round robin assigner is used.

We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled.

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline.

@SparkQA
Copy link

SparkQA commented Oct 19, 2016

Test build #67157 has finished for PR 15541 at commit 75cdd1a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OfferState(val workOffer: WorkerOffer)
    • class RoundRobinAssigner extends TaskAssigner
    • class BalancedAssigner extends TaskAssigner
    • class PackedAssigner extends TaskAssigner

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 19, 2016

@rxin @gatorsmile Can you please take a look, and kindly provide your comments.

def init(conf: SparkConf): TaskAssigner = {
val assignerName = conf.get(config.SPARK_SCHEDULER_TASK_ASSIGNER.key, "roundrobin")
.toLowerCase()
val className = assignerMap.getOrElse(assignerName, roundrobin)
Copy link
Member

@gatorsmile gatorsmile Oct 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

val assignerName = conf.get(config.SPARK_SCHEDULER_TASK_ASSIGNER.key, "roundrobin")
val className = assignerMap.getOrElse(assignerName.toLowerCase, roundrobin)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a log info or warn when the given assignerName is not correct instead of slightly turning to default one.

val assigner = try {
logInfo(s"Constructing an assigner as $className")
Utils.classForName(className).getConstructor()
.newInstance().asInstanceOf[TaskAssigner]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one line?

override def hasNext: Boolean = idx < sorted.size

override def getNext(): OfferState = {
currentOffer = sorted(idx)
Copy link
Member

@gatorsmile gatorsmile Oct 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I just read this function, my first question is how we can ensure this will not be out of boundary? We need to leave a comment to explain this. Or add a safety check for avoiding any bug we could add in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change to make it similar to Iterator.next() method and add comments with similar comments to the Iterator.next()

@@ -60,7 +58,7 @@ private[spark] class TaskSchedulerImpl(
def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))

val conf = sc.conf

private val taskAssigner: TaskAssigner = TaskAssigner.init(conf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two space between private and val

logInfo(s"$assignerName cannot be constructed, fallback to default $roundrobin.")
new RoundRobinAssigner()
}
assigner.withCpuPerTask(CPUS_PER_TASK)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove line 109 after following the change:

assigner.withCpuPerTask(CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1))

}

override def offerAccepted(assigned: Boolean): Unit = {
if (currentOffer.coresAvailable < CPUS_PER_TASK || !assigned) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the current offer is not assigned, why we need to step to next offer if coresAvailable is still enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two cases :
a) There is no (or insufficient) locality information - in which case, what you describe will hold.
All subsequent requests will also not result in assignment.

b) If there are other executors for which sufficient locality affinity holds, then a 'later' executor in the iteration order can satisfy the locality preference.

The assignment is decided by TaskSetManager eventually - the Assigner is simply specifying the order in which iteration proceeds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the current offer is rejected, it is not valid for the current taskset, (probably due to locality restriction). Each scheduling algorithm has to respect the locality restriction, and in the meantime provide next available offer to the taskset.


override def construct(workOffer: Seq[WorkerOffer]): Unit = {
offer = Random.shuffle(workOffer.map(o => new OfferState(o)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you will put offers into the PriorityQueue, is it still necessary to do shuffling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments from @mridulm in last PR. I think it is reasonable, but don't have concrete answer in my mind.

"Would be good to shuffle workOffset's for this class too.
Practically, this ensures that initial heap will be randomized when cores are the same. "

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds correct. However, I don't think it has real effect. Once the cores are the same, meaning no task gets assigned in previous run. So it doesn't matter if we begin with different order of offers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking to several other people, they don't feel the shuffle is strongly needed. @mridulm If you don't mind, I will remove it in my next patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhzhan Can you elaborate what the concern with shuffle'ing are ?
There were various reasons why we started shuffling offers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm I am not sure how much the shuffle can impact the scheduling, and thus don't have strong opinion on this.
@viirya Even if the cores are the same, it does not mean that "no task gets assigned in previous run". Shuffling does take effect here. For example, the previous round may be (5, 4, 3), and one core is allocated, then the current round would be (4, ,4 3).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhzhan Let is preserve the behavior - for any application using this assigner, all tasksets will be executed based on the ordering of offers (both with and without good locality info).
The impact can be fairly non trivial - which is why shuffle'ing was initially added.

* Assign the task to workers with the most available cores.
*/
class BalancedAssigner extends TaskAssigner {
private var maxHeap: PriorityQueue[OfferState] = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use val and call clear in init below?


override def reset(): Unit = {
super.reset
maxHeap = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As pointed above, call clear?

* other jobs by underling resource manager. This assigner can potentially reduce the resource
* reservation for a job.
*/
class PackedAssigner extends TaskAssigner {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this assigner will try to pack as more as possible tasks into the same worker, the concern would be the increasing memory pressure on the worker. Do you have experienced such issue in your practical usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your concern is valid. Each scheduling algorithms has its pros and cons, and which one is chosen depends on user's requirement. We mainly want to use this to save reserved resources combined with dynamic allocation. In our pipeline, we didn't observe the problem. If it happens, we need to investigate the memory allocation part to see whether it has problem.


// Invoked at the end of resource offering to release internally maintained resources.
// Subclass is responsible to release its own private resources.
def reset(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need reset()? Looks like we only need init(). As we will call init before each assignment, it should be complete in resetting the status to initial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that if we do not use reset, the assigner has to keep internal resources until next time, but it is not big overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you don't have big object which posts serious concern on this.

class OfferState(val workOffer: WorkerOffer) {
// The current remaining cores that can be allocated to tasks.
var coresAvailable: Int = workOffer.cores
// The list of tasks that are assigned to this worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"this worker" ? this wont represent a worker here.

* TaskAssigner is the base class for all task assigner implementations, and can be
* extended to implement different task scheduling algorithms.
* Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner
* is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: requested => requests

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

/** Tracking the current state of the workers with available cores and assigned task list. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Tracking => Tracks

* is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested
* to perform task assignment given available workers, first sorts the candidate tasksets,
* and then for each taskset, it takes a number of rounds to request TaskAssigner for task
* assignment with different the locality restrictions until there is either no qualified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: different the locality restrictions => different locality restrictions

*
* TaskAssigner is responsible to maintain the worker availability state and task assignment
* information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]]
* and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to put these into separate points and not as a paragraph. Also, I am not sure what the protocol is about putting details like method names in the doc. As things stand, it will serve good for people trying to read the code but as the codebase evolves, things might get out of sync if this comment is not updated.

<td><code>spark.scheduler.taskAssigner</code></td>
<td>roundrobin</td>
<td>
The strategy of how to allocate tasks among workers with free cores. By default, roundrobin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: create a list for each policy and explain inline instead of saying former, latter below.

@@ -60,7 +58,7 @@ private[spark] class TaskSchedulerImpl(
def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))

val conf = sc.conf

private val taskAssigner: TaskAssigner = TaskAssigner.init(conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space in private val

val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
taskAssigner.init()
while(taskAssigner.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after while

}

override def offerAccepted(assigned: Boolean): Unit = {
if (currentOffer.coresAvailable >= CPUS_PER_TASK && assigned) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you add it to the heap any ways despite of what assigned is set to ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is rejected, it is not valid for this round of assignment for this specific task set anymore. Because it means it is not valid for all tasks in the task set.

}

override def offerAccepted(assigned: Boolean): Unit = {
if (currentOffer.coresAvailable < CPUS_PER_TASK || !assigned) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as before. Shouldn't you add it to the heap any ways despite of what assigned is set to ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the reason above, if the offer is rejected, we have to move forward

<td>
The strategy of how to allocate tasks among workers with free cores. By default, roundrobin
with randomness is used, which tries to allocate task to workers with available cores in
roundrobin manner. In addition, packed and balanced is provided. The former tries to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: packed and balanced are provided.

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67224 has finished for PR 15541 at commit e81b279.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


override def init(): Unit = {
currentOfferIndex = 0
sorted = offer.filter(_.coresAvailable >= cpuPerTask).sortBy(_.coresAvailable)
Copy link
Member

@viirya viirya Oct 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like the shuffling reason of BalancedAssigner can be applied here too? If shuffling, this ensures that initial sorted offers will be randomized when cores are the same, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Agree, good point.

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67290 has finished for PR 15541 at commit 945d623.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 21, 2016

@rxin Can you please take a look, and let me know if you have any concern?

@gatorsmile
Copy link
Member

Accidentally, I deleted all my comments. You might need to check the emails to find all my comments. :)

import org.apache.spark.util.Utils

/** Tracks the current state of the workers with available cores and assigned task list. */
class OfferState(val workOffer: WorkerOffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class private to scheduler?

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 21, 2016

@gatorsmile I didn't see your new comments

* extended to implement different task scheduling algorithms.
* Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner
* is used to assign tasks to workers with available cores. Internally, when TaskScheduler
* perform task assignment given available workers, it first sorts the candidate tasksets,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perform -> performs

* First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal
* worker states at the beginning of resource offering.
*
* Second, before each round of task assignment for a taskset, TaskScheduler invoke the init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invoke -> invokes


/**
* Tests Whether there is offer available to be used inside of one round of Taskset assignment.
* @return `true` if a subsequent call to `next` will yield an element,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@return is not aligned with the line above.

def init(): Unit

/**
* Tests Whether there is offer available to be used inside of one round of Taskset assignment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether -> whether

}

/**
* Assign the task to workers with the most available cores. It other words, BalancedAssigner tries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It -> In

* Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that
* the assigner can decide whether the current worker is valid for the next offering.
*/
def offerAccepted(assigned: Boolean): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered that in your original PR, there is a resource release method. Do you still need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the review comments, we do not need it anymore. The resource will be released in the init method.

The strategy of how to allocate tasks among workers with free cores. There are three task
assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin
with randomness is used, which tries to allocate task to workers with available cores in
roundrobin manner.The packed task assigner tries to allocate tasks to workers with the least
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed space between . and The

@SparkQA
Copy link

SparkQA commented Oct 22, 2016

Test build #67395 has finished for PR 15541 at commit dd2b207.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 24, 2016

Test build #67428 has finished for PR 15541 at commit a820e96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 25, 2016

@rxin Would you like to take a look and let you know if you have any concern? Thanks.

@gatorsmile
Copy link
Member

retest this please

@rxin
Copy link
Contributor

rxin commented Nov 1, 2016

Sure will take a look in the next couple of days to get this into 2.1 if possible.

* workers' memory pressure as less tasks running on the same workers, which also indicates that
* the task itself can make use of more computation resources, e.g., hyper-thread, across clusters.
*/
class BalancedAssigner extends TaskAssigner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between this and packed? Wouldn't they look similar? Why would anyone use one over another?

Copy link
Contributor Author

@zhzhan zhzhan Nov 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin These two do the opposite thing. The packed scheduler tries to schedule tasks to workers as few as possible so that some workers without task running can be released. The balanced assigner tries to schedule the tasks to workers with the least work load.

If user wants optimal resource reservation, they may want the packer assigner. If user observe some memory pressure, they may want to try the balanced assigner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I made a mistake -- I meant to ask the difference between balance and round robin. Isn't the two similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two assigner may behave similar in practice. The difference is that the balanced assigner tries to distribute the work load more aggressively.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67863 has finished for PR 15541 at commit a820e96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val className = {
val name = assignerMap.get(assignerName.toLowerCase())
name.getOrElse {
logWarning(s"$assignerName cannot be constructed, fallback to default $roundrobin.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd actually fail Spark if it cannot be constructed -- otherwise it is easier to make mistakes.

@rxin
Copy link
Contributor

rxin commented Nov 1, 2016

To be honest, I find the current API pretty weird (it is some stateful object that has to be reset every time). I suspect you designed this API by just abstracting out the logic you wanted to change from the existing implementation, but that doesn't necessarily lead to intuitive apis. It's been a while since I last checked the scheduler code, so it'd take me a while to page back in.

<td>roundrobin</td>
<td>
The strategy of how to allocate tasks among workers with free cores. Three task
assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I suggest double quote the keywords "roundrobin", "packed", and "balanced" in this paragraph. E.g. the "balanced" task assigner sounds better to me than the balanced task assigner.

@@ -305,12 +307,8 @@ private[spark] class TaskSchedulerImpl(
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
taskAssigner.construct(offers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments of the resourceOffers method shoud be updated. It still says We fill each node with tasks in a round-robin manner so that tasks are balanced across the cluster.

@zhzhan
Copy link
Contributor Author

zhzhan commented Nov 1, 2016

@rxin Thanks for the feedback regarding the TaskAssigner API. The current API is designed based on the current logic of TaskSchedulerImp, where the scheduler takes many rounds to assign the tasks for each task set. I have not figured out a better way yet. Any suggestions are welcome.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67924 has finished for PR 15541 at commit b06de5e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #69015 has finished for PR 15541 at commit ada2a45.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 7, 2017

Test build #72506 has finished for PR 15541 at commit ada2a45.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

@rxin @zhzhan Is there any chance that we could reach any consensus on the API design and move forward with this PR?

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97708 has started for PR 15541 at commit ada2a45.

@zhzhan zhzhan closed this Oct 22, 2018
@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97724 has started for PR 15541 at commit ada2a45.

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97780 has started for PR 15541 at commit ada2a45.

@AmplabJenkins
Copy link

Build finished. Test FAILed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.